package com.lg.monitor

import java.sql.{Connection, DriverManager}

import com.lg.bean.BusInfo
import org.apache.spark.sql.ForeachWriter

object MysqlWriter {
  val driver = "com.mysql.cj.jdbc.Driver"
  var connection: Connection = _

  def getConnection(host: String,
                    port: Int,
                    user: String,
                    pwd: String,
                    db: String): Connection = {
    val url: String = "jdbc:mysql://" + host + ":" + port + "/" + db + "?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
    Class.forName(driver)
    connection = DriverManager.getConnection(url, user, pwd)
    connection
  }

  class MysqlWriter extends ForeachWriter[BusInfo] {
    val insertSQL: String =
      "INSERT INTO `lg_logistics`.`bus_info` (`lglat`, `deployNum`) VALUES (?, ?);"

    override def open(partitionId: Long, epochId: Long): Boolean = {
      MysqlWriter.getConnection("linux123",
        3306,
        "root",
        "12345678",
        "lg_logistics") != null
    }

    override def process(value: BusInfo): Unit = {
      val preparedStatement = connection.prepareStatement(insertSQL)
      val lglat: String = value.lglat
      val deployNum = value.deployNum
      preparedStatement.setString(1, lglat)
      preparedStatement.setString(2, deployNum)
      println("insert执行完毕(" + lglat + " " + deployNum + ")")

    }

    override def close(errorOrNull: Throwable): Unit = {
      if (connection != null) {
        connection.close()
      }
    }
  }
}
